Published on

Flink × TiDB 实时 ETL 踩坑记录

Authors
  • avatar
    Name
    Wang Zhiwei
    Twitter

Flink Job Exceptions

java.io.IOException: Corrupt Maxwell JSON message '{"database":"tffi","table":"product","type":"delete","ts":1660102097,"old":{"id":1,"name":"scoot"}}'.
	at org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.deserialize(MaxwellJsonDeserializationSchema.java:141)

详细

2022-08-10 03:30:10,097 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-300, groupId=null] Cluster ID: pku7rMr1RxecrRDqbt9lGA
2022-08-10 03:30:10,100 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, product]], fields=[id, name]) -> DropUpdateBefore -> Sink: Sink(table=[default_catalog.default_database.sink_product], fields=[id, name]) (1/1)#107 (c2c2ed5aa949a7a7832b2f80fb35ae4f) switched from RUNNING to FAILED.
java.io.IOException: Corrupt Maxwell JSON message '{"database":"tffi","table":"product","type":"delete","ts":1660102097,"old":{"id":1,"name":"scoot"}}'.
	at org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.deserialize(MaxwellJsonDeserializationSchema.java:141) ~[flink-json-1.12.2.jar:1.12.2]
	at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-sql-connector-kafka_2.11-1.12.2%20(1).jar:1.12.2]
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) ~[flink-sql-connector-kafka_2.11-1.12.2%20(1).jar:1.12.2]
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[flink-sql-connector-kafka_2.11-1.12.2%20(1).jar:1.12.2]
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[flink-sql-connector-kafka_2.11-1.12.2%20(1).jar:1.12.2]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
Caused by: java.lang.NullPointerException
	at org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.deserialize(MaxwellJsonDeserializationSchema.java:126) ~[flink-json-1.12.2.jar:1.12.2]
	... 7 more

不同系统中 Maxwell 数据格式差异

TiCDC 的 Maxwell message

INSERT
{
   "database": "tffi",
   "table": "product",
   "type": "insert",
   "ts": 1660101830,
   "data": {
      "id": 4,
      "name": "hammer"
   }
}

UPDATE
{
   "database": "tffi",
   "table": "product",
   "type": "update",
   "ts": 1660102087,
   "data": {
      "id": 1,
      "name": "scoot"
   },
   "old": {
      "name": "scooter"
   }
}

DELETE
{
   "database": "tffi",
   "table": "product",
   "type": "delete",
   "ts": 1660102097,
   "old": {
      "id": 1,
      "name": "scoot"
   }
}

Maxwell 官方的 dataformat

https://maxwells-daemon.io/dataformat/

DELETE
{
   "database":"test",
   "table":"e",
   "type":"delete",
   ...
   "data":{
      "id":1,
      "m":5.444,
      "c":"2016-10-21 05:33:54.631000",
      "comment":"I am a creature of light."
   }
}

after a DELETE, data contains a copy of the row, just before it shuffled off this mortal coil.

差异

对于 DELETE 消息,Maxwell 官方返回的是 data field,而 TiCDC 是 old field。

深入源码

Flink

MaxwellJsonDeserializationSchema.java - apache/flink - Sourcegraph

            RowData row = jsonDeserializer.deserialize(message);
            String type = row.getString(2).toString(); // "type" field
						...
            else if (OP_DELETE.equals(type)) {
                // "data" field is a row, contains deleted rows
                RowData delete = row.getRow(0, fieldCount);
                delete.setRowKind(RowKind.DELETE);
                out.collect(delete);

            }

Flink 中 Maxwell 消息的数据格式和 Maxwell 官方一致,DELETE 消息都取的 data field。

TiCDC

maxwell.go - pingcap/tiflow - Sourcegraph

if e.IsDelete() {
		value.Type = "delete"
		for _, v := range e.PreColumns {
			switch v.Type {
			case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
				if v.Value == nil {
					value.Old[v.Name] = nil
				} else if v.Flag.IsBinary() {
					value.Old[v.Name] = v.Value
				} else {
					value.Old[v.Name] = string(v.Value.([]byte))
				}
			default:
				value.Old[v.Name] = v.Value
			}
		}
	}

解决


引用

https://ververica.github.io/flink-cdc-connectors/release-2.1/content/formats/changelog-json.html

https://github.com/ververica/flink-cdc-connectors/blob/release-2.1/flink-format-changelog-json/src/main/java/com/ververica/cdc/formats/json/ChangelogJsonFormatFactory.java